File: //opt/alt/python37/lib/python3.7/site-packages/cllimits/clquota_lib.py
# -*- coding: utf-8 -*-
# clquota.py - module for interfacing with cl-quota utility for get/set user's quotas
#
# Copyright © Cloud Linux GmbH & Cloud Linux Software, Inc 2010-2019 All Rights Reserved
#
# Licensed under CLOUD LINUX LICENSE AGREEMENT
# http://cloudlinux.com/docs/LICENSE.TXT
from __future__ import absolute_import
import csv
from cllimits.lib import exec_utility
from clcommon.clexception import FormattedException
from clcommon.utils import run_command, ExternalProgramFailed
from cllimits.lib.utils import is_quota_supported, is_quota_active
class ClQuotaException(FormattedException):
pass
class ClQuota:
_CL_QUOTA_PATH = '/usr/bin/cl-quota'
_REPQUOTA_PATH = '/usr/sbin/repquota'
def __init__(self):
# inodes limits data
# uid --> (soft_limit, hard_limit)
self._inodes_limits = None
# Error flag
self._is_clquota_error = False
# Check if quota is supported
self._is_clquota_present = is_quota_supported(self._CL_QUOTA_PATH, self._REPQUOTA_PATH)
# Check if quota is activated
self._is_clquota_activated = is_quota_active(self._CL_QUOTA_PATH, self._REPQUOTA_PATH)
def is_clquota_present(self):
"""
Get quota presence flag
:return: True/False - quotas present/not present
"""
return self._is_clquota_present
def is_clquota_activated(self):
"""
Get quota activated flag
:return: True/False - quotas present/not present
"""
return self._is_clquota_activated
def get_inodes_limits_by_uid(self, user_id):
"""
Retrive inodes limits by uid
:param user_id: Supplied uid
:return: cortege (soft_limit, hard_limit). (None, None) if governor not present or error
"""
# Load inodes data if need
self._load_info()
uid = str(user_id)
if uid in self._inodes_limits:
return self._inodes_limits[uid]
return self._inodes_limits['0']
def set_user_inodes_limits(self, uid, limits, force=False):
"""
Set inodes limits for user uid
:param: int uid: user id
:param: list limits: new inodes limits
:param: bool force: save limits if even they are equal to defaults
:return: None
"""
self._load_info()
if not isinstance(uid, int) or uid < 0:
exc_message = {'message': "User id '%(uid)s' isn't a positive integer",
'context': {'uid': uid}}
raise ClQuotaException(exc_message)
if limits == "unlimited":
limits = ["unlimited"] * 2
elif limits == "default":
limits = ["default"] * 2
else:
if type(limits) in [tuple, list]:
pass
elif len(limits.split(',')) == 2:
limits = limits.split(',')
else:
exc_message = {'message': "Limits %(limits)s aren't acceptable.",
'context': {'limits': limits}}
raise ClQuotaException(exc_message)
for l in limits:
try:
if l != "default":
l = int(l)
if l < 0:
raise ValueError()
except ValueError:
exc_message = {'message': "Limit value '%(limit)s' isn't a positive integer or string %(default)s",
'context': {'limit': l, 'default': 'default'}}
raise ClQuotaException(exc_message)
cmd = [self._CL_QUOTA_PATH,
'--user-id=%d' % uid,
'--soft=%s' % limits[0],
'--hard=%s' % limits[1]]
if force:
cmd.append("--force")
try:
run_command(cmd)
except ExternalProgramFailed as e:
raise ClQuotaException(str(e))
def set_user_inodes_limits_unlimited(self, uid):
"""
Set unlimited inodes limits for user uid
:param: int uid: user id
:return: None
"""
self.set_user_inodes_limits(uid, "unlimited")
def reset_user_inodes_limits(self, uid):
"""
Set default inodes limits for user uid
:param: int uid: user id
:return: None
"""
self.set_user_inodes_limits(uid, "default")
def _load_info(self):
"""
Loads users info from cl-quota
:return: None
"""
# Exit if cl-quota data already loaded
if self._inodes_limits is not None:
return
disabled_exc_message = {'message': "%(util)s is disabled",
'context': {'util': 'Quota'}}
# Exit if cl-quota not present or cl-quota error
if not self._is_clquota_present or not self._is_clquota_activated or self._is_clquota_error:
raise ClQuotaException(disabled_exc_message)
# Get all cl-quota limits and parse them to self._inodes_limits
ret_code, s_quota_limits = exec_utility(self._CL_QUOTA_PATH, ['--csv'])
if ret_code != 0:
# Error
self._is_clquota_error = True
raise ClQuotaException(disabled_exc_message)
reader = csv.reader(s_quota_limits.split('\n'), delimiter=',')
self._inodes_limits = dict()
for row in reader:
if 'ERROR' in row:
# Error
self._is_clquota_error = True
# Remove data dictionary
self._inodes_limits = None
raise ClQuotaException(disabled_exc_message)
# Pass header line
if 'id' in row:
continue
# Add limits to data dictionary
self._inodes_limits[row[0]] = (int(row[2]), int(row[3]))
if '0' not in self._inodes_limits:
# Error
self._is_clquota_error = True
# Remove data dictionary
self._inodes_limits = None
exc_message = {'message': "There is no %(what)s found in %(where)s",
'context': {'what': 'default settings', 'where': '%s output' % self._CL_QUOTA_PATH}}
raise ClQuotaException(exc_message)
def reset_inodes_limits(self):
"""
Reset inodes limits for all users to package limits
:return:
"""
if not self._is_clquota_present or not self._is_clquota_activated:
return
exec_utility(self._CL_QUOTA_PATH, ['--sync'])